package com.czxy

import java.util.Properties

import com.czxy.bean.{HBaseMeta, TagRule}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.expressions.UserDefinedFunction

object MarriageTags {
  def getHBaseMeta(fourMap: Any): HBaseMeta = ???

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("MarriageTracker").master("local[*]").getOrCreate()

    val prop = new Properties()
    prop.setProperty("user", "root")
    prop.setProperty("password", "123456")
    val MysqlDataF = spark.read.jdbc("jdbc:mysql://bd001:3306/tags_new", "tbl_basic_tag", prop)

    //导入饮隐式转换
    import spark.implicits._

    //引入java和scala转化
    import scala.collection.JavaConverters._

    //sparksql内置函数
    import org.apache.spark.sql.functions._
    //引入java和scala转化

    //sparksql内置函数

    //读取四级标签数据  李宝阳
    val fourTags: Dataset[Row] = MysqlDataF.select("id", "rule").where("id=75")
    val ruleMap: Map[String, String] = fourTags.map(row => {
      row.getAs("rule").toString.split("##").map(
        kv => {
          val arr: Array[String] = kv.split("=")
          (arr(0), arr(1))

        }
      )
    }).collectAsList().get(0).toMap

    //封装四级标签样例类
    var hBaseMeta: HBaseMeta = toHbaseMeta(ruleMap)
    //获取5级标签数据  李宝阳
    val fiverTag: Dataset[Row] = MysqlDataF.select("id", "rule").where("pid=75")
    val FiverList: List[TagRule] = fiverTag.map(row => {
      val int: Int = row.getAs("id").toString.toInt
      val rule: String = row.getAs("rule").toString
      TagRule(int, rule)

    }).collectAsList().asScala.toList

    /*
        for (a<-FiverList){

    }
     */

    //读取hbase数据   李宝阳
    val hbaseDatas: DataFrame = spark.read.format("com.czxy.tools.HBaseDataSource")
      .option("zkHosts", hBaseMeta.zkHosts)
      .option(HBaseMeta.ZKPORT, hBaseMeta.zkPort)
      .option(HBaseMeta.HBASETABLE, hBaseMeta.hbaseTable)
      .option(HBaseMeta.FAMILY, hBaseMeta.family)
      .option(HBaseMeta.SELECTFIELDS, hBaseMeta.selectFields)
      .load()


    val GetMarriages: UserDefinedFunction = udf((rule: String) => {
      var MarriageId = 0
      for (fiverList <- FiverList) {
        if (fiverList.rule == rule) {
          MarriageId = fiverList.id
        }
      }
      MarriageId
    })

    //使用join将两个表数据的tagsid合并到一起 并写入Hbase     李宝阳
    //自定义函数
    var MarriageMatching = GetMarriages
    val MarriageTags: DataFrame = hbaseDatas.select('id as ("userId"), MarriageMatching('marriage) as ("tagsId"))

    var GetAllTags = udf((oldId:String,newId:String)=>{
      if (oldId==""){
        newId
      }else if (newId==""){
        oldId
      }else if(oldId==""&&newId==""){
        ""
      }else{
        val Tags =  oldId+","+newId
        Tags.split(",").distinct.mkString(",")
      }
    })
    //追加写入
    //读取老数据
    val oldData: DataFrame = spark.read.format("com.czxy.tools.HBaseDataSource")
      .option("zkHosts", hBaseMeta.zkHosts)
      .option(HBaseMeta.ZKPORT, hBaseMeta.zkPort)
      .option(HBaseMeta.HBASETABLE, "test")
      .option(HBaseMeta.FAMILY, "detail")
      .option(HBaseMeta.SELECTFIELDS, "userId,tagsId")
      .load()

    oldData.show(20)

    //老数据合并新数据
    val AllTags: DataFrame = oldData.join(MarriageTags, oldData("userId") === MarriageTags("userId"))
    // val frame: DataFrame = AllTags.select("userId", "tagsId")
    val realAllTags: DataFrame = AllTags.select(
      //处理第一个字段取一个
      when((oldData.col("userId").isNotNull), (oldData.col("userId")))
        .when((MarriageTags.col("userId").isNotNull), (MarriageTags.col("userId")))
        .as("userId"),
      //处理第二个字段
      GetAllTags(oldData.col("tagsId"), MarriageTags.col("tagsId")).as("tagsId")
    )

    //最终结果写入
    realAllTags.write.format("com.czxy.tools.HBaseDataSource")
      .option("zkHosts", hBaseMeta.zkHosts)
      .option(HBaseMeta.ZKPORT, hBaseMeta.zkPort)
      .option(HBaseMeta.HBASETABLE, "test")
      .option(HBaseMeta.FAMILY, "detail")
      .option(HBaseMeta.SELECTFIELDS, "userId,tagsId")
      .save()
  }

  def toHbaseMeta(ruleMap: Map[String, String]):HBaseMeta = {
    //李宝阳
    HBaseMeta(ruleMap.getOrElse(HBaseMeta.INTYPE,""),
      ruleMap.getOrElse(HBaseMeta.ZKHOSTS,""),
      ruleMap.getOrElse(HBaseMeta.ZKPORT,""),
      ruleMap.getOrElse(HBaseMeta.HBASETABLE,""),
      ruleMap.getOrElse(HBaseMeta.FAMILY,""),
      ruleMap.getOrElse(HBaseMeta.SELECTFIELDS,""),
      ruleMap.getOrElse(HBaseMeta.ROWKEY,"")
    )
  }
}
